本次的程式碼與目錄結構可以參考 FastAPI Tutorial : Day20 branch
我們在
今天我們會完成整體 OAuth2 Authentication 的實作
把各個部分串起來
並修好細節 !
在 PUT /user/{user_id}
與 PUT /user/{user_id}/password
的時候
這兩個 API 都會更新密碼
應該改為 PUT /user/{user_id}/password
這個 API 才會更新密碼比較合適
schemas/user.py
# ...
class UserUpdate(UserBase):
#password: Optional[str] = Field(min_length=6) # <------ 移除
avatar: Optional[str] = None
age: Optional[int] = Field(gt=0,lt=100)
birthday: Optional[date] = Field()
# ...
api/user.py
@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
# ...
# newUser.password = get_password_hash(newUser.password) # <------ 移除
await UserCrud.update_user(newUser,user_id)
return newUser
crud/user.py
# ...
async def update_user(self,newUser: UserSchema.UserUpdate,user_id:int, db_session:AsyncSession=None):
stmt = update(UserModel).where(UserModel.id == user_id).values(
# password=newUser.password, # <------ 新增
name=newUser.name,
age=newUser.age,
birthday=newUser.birthday,
avatar=newUser.avatar
)
在 create_user
、 update_user_password
這兩個 API 中
將明碼密碼改成 hash 過的密碼都是寫在 api/user.py
中
應該把這部分的邏輯寫在 crud/user.py
中
api/users.py
# ...
async def create_user(newUser: UserSchema.UserCreate ):
# ...
# newUser.password = get_password_hash(newUser.password) # <------ 移除
user = await UserCrud.create_user(newUser)
return vars(user)
async def update_user_password(newUser:UserSchema.UserUpdatePassword,user_id:int=Depends(check_user_id)):
# newUser.password = get_password_hash(newUser.password) # <------ 移除
await UserCrud.update_user_password(newUser,user_id)
return
crud/user.py
async def create_user(self,newUser: UserSchema.UserCreate, db_session:AsyncSession=None ):
user = UserModel(
name=newUser.name,
password=get_password_hash(newUser.password), # <------ 新增
# ...
)
# ...
return user
# ...
async def update_user_password(self,newUser: UserSchema.UserUpdatePassword,user_id:int, db_session:AsyncSession=None):
stmt = update(UserModel).where(UserModel.id == user_id).values(
password=get_password_hash(newUser.password), # <------ 新增
)
# ...
我們在 Day19 只有為 update_user
這個 API 加上了權限驗證
接下來加上 CurrentUser
Schema 確保傳遞正確
並為所有需要驗證權限的 API 加上 get_current_user
dependency
PUT /user/{user_id}
PUT /user/{user_id}/password
DELETE /user/{user_id}
schemas/user.py
class CurrentUser(BaseModel):
id: int
name: str
email: str
api/user.py
# ...
async def update_user_password(
newUser:UserSchema.UserUpdatePassword,
user_id:int=Depends(check_user_id),
current_user:UserSchema.CurrentUser=Depends(get_current_user) # <------ 新增
):
# ...
if current_user.id != user_id: # <------ 新增
raise Exception403
# ...
# ...
async def delete_users(
user_id:int = Depends(check_user_id),
user:UserSchema.CurrentUser = Depends(get_current_user) # <------ 新增
):
# ...
if user.id != user_id:
raise Exception403
# ...
我們為 GET /items/{item_id}
加上 ItemInfor
Schema
再新增 UpdateItem
Schema
schemas/item.py
# ...
class ItemInfor(ItemRead):
brand: str
description: Optional[str] = None
class ItemUpdate(ItemBase):
name: Optional[str] = None
price: Optional[float] = None
brand: Optional[str] = None
description: Optional[str] = None
# ...
我們從 CRUD 開始的範例都以 User 為主
所以 Item 的 CRUD 只需要照著 User 的 CRUD 做一些修改就可以了
先 import 相關的 Schema 、 Model
和 SQLAlchemy 的操作、AsyncSession
與 crud_class_decorator
crud/item.py
@crud_class_decorator
class ItemCrudManager:
async def get_item(self, item_id: int):
stmt = select(ItemModel).where(ItemModel.id == item_id)
result = await self.db_session.execute(stmt)
return result.scalars().first()
async def get_items(self, skip: int = 0, limit: int = 100):
stmt = select(ItemModel).offset(skip).limit(limit)
result = await self.db_session.execute(stmt)
return result.scalars().all()
async def create_item(self, newItem: ItemSchema.ItemCreate):
item = ItemModel(
name=newItem.name,
price=newItem.price,
brand=newItem.brand,
description=newItem.description
)
self.db_session.add(item)
await self.db_session.commit()
await self.db_session.refresh(item)
return item
async def update_item(self, newItem: ItemSchema.ItemUpdate, item_id: int):
stmt = update(ItemModel).where(ItemModel.id == item_id).values(
name=newItem.name,
price=newItem.price,
brand=newItem.brand,
description=newItem.description
)
await self.db_session.execute(stmt)
await self.db_session.commit()
return
async def delete_item(self, item_id: int):
stmt = delete(ItemModel).where(ItemModel.id == item_id)
await self.db_session.execute(stmt)
await self.db_session.commit()
return
接下來要為 Item 加上權限管理
避免 Item 被不屬於自己的 User 修改、刪除
所以一樣可以透過我們的 get_current_user
來取得當前的 User
和 check_item_id
來確認 Item 是否存在
來檢查 item.user_id
是否等於 current_user.id
api/item.py
@router.put("/items/{item_id}" , response_model=ItemUpdate)
async def update_items(
updateItem: ItemUpdate,
item:CurrentItem = Depends(check_item_id),
user:CurrentUser = Depends(get_current_user)):
if item.user_id != user.id:
raise HTTPException(status_code=403, detail="Forbidden")
item = await ItemCrud.update_item_by_id(item.id,updateItem)
return item
@router.delete("/items/{item_id}")
async def delete_items(
item:CurrentItem = Depends(check_item_id),
user:CurrentUser = Depends(get_current_user)):
if item.user_id != user.id:
raise HTTPException(status_code=403, detail="Forbidden")
await ItemCrud.delete_item_by_id(item.id)
透過 Depends
來取得 item
和 user
只需要額外加上一行判斷就可以完成權限管理!
當我們以 user1
的身份來修改 test
的 Item 時
就會回傳 403 Permissison Denied
延續著 User 與 Item 的 CRUD
我們完成了整個 OAuth2 Authentication 的實作
並且加上了權限管理
快看到需要驗證權限的 API 都有 lock
的圖示了!